package com.bodystm.server;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;

import com.bodystm.bean.datadeal.BloodPressureAnalysis;
import com.bodystm.bean.datadeal.BreathRateAnalysis;
import com.bodystm.bean.datadeal.EcgAnalysis;
import com.bodystm.bean.datadeal.OximetryAnalysis;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;

/**
 * websocketServer,用来管理所有的socket连接，发送数据到客户端
 * @author ehl
 *
 */
public class WebSocketServer  implements IWebSocketService, IHttpService, Runnable{
	/*public static void main(String[] args) {  
        new WebSocketServer(9999).start();  
    }*/ 
	private static final Logger logger = Logger.getLogger(WebSocketServer.class); 
	// handshaker attachment key  
    private static final AttributeKey<WebSocketServerHandshaker> ATTR_HANDSHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNELID");
    private static final int MAX_CONTENT_LENGTH = 65536;  
    private static final String WEBSOCKET_UPGRADE = "websocket";  
    private static final String WEBSOCKET_CONNECTION = "Upgrade";  
    private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d";
    
    private static final String HN_HTTP_CODEC = "HN_HTTP_CODEC";  
    private static final String HN_HTTP_AGGREGATOR = "HN_HTTP_AGGREGATOR";  
    private static final String HN_HTTP_CHUNK = "HN_HTTP_CHUNK";  
    private static final String HN_SERVER = "HN_LOGIC"; 
    // ------------------------ member fields -----------------------  
    private String host; // 绑定的ip地址  
    private int port; // 绑定的端口
    /** 
     * 保存所有WebSocket连接 
     */  
    private Map<ChannelId, Channel> channelMap = new ConcurrentHashMap<ChannelId, Channel>(); 
    public Map<String, Channel> channelsMap = new ConcurrentHashMap<String, Channel>();
    /**
     * 用于血氧实时波形数据的传输
     */
    public Map<String, Channel> channelsMap4Ecg = new ConcurrentHashMap<String, Channel>();
    public Map<String, Channel> channelsMap4Oximetry = new ConcurrentHashMap<String, Channel>();
    public Map<String, Channel> channelsMap4PBP = new ConcurrentHashMap<String, Channel>();
    public Map<String, Channel> channelsMap4RESP = new ConcurrentHashMap<String, Channel>();
    
    public Map<String, ArrayList<Channel>> channelsMap4Ecg2 = new ConcurrentHashMap<String, ArrayList<Channel>>();
    public Map<String, ArrayList<Channel>> channelsMap4Oximetry2 = new ConcurrentHashMap<String, ArrayList<Channel>>();
    public Map<String, ArrayList<Channel>> channelsMap4PBP2 = new ConcurrentHashMap<String, ArrayList<Channel>>();
    public Map<String, ArrayList<Channel>> channelsMap4RESP2 = new ConcurrentHashMap<String, ArrayList<Channel>>();
    //用于存储channelid和mac的对应关系，以便websocket断开连接时便于查找channel并删除
    private Map<String, String> hashID_MAC=new HashMap<String, String>();
    private  String WEBSOCKET_URI_ROOT;//final
    public WebSocketServer(int port) {  
        this("localhost", port);
    }  
      
    public WebSocketServer(String host, int port) {  
        this.host = host;  
        this.port = port;  
        WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port);  
    } 
    public void start() {  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        ServerBootstrap b = new ServerBootstrap();  
        b.group(bossGroup, workerGroup);  
        b.channel(NioServerSocketChannel.class);  
        b.childHandler(new ChannelInitializer<Channel>() {  
            @Override  
            protected void initChannel(Channel ch) throws Exception {  
                ChannelPipeline pl = ch.pipeline();  
                // 保存该Channel的引用  
                //channelMap.put(ch.id(), ch);  
                logger.info("new channel:"+ch);  
                ch.closeFuture().addListener(new ChannelFutureListener() {  
                      
                    public void operationComplete(ChannelFuture future) throws Exception {  
                        logger.info("channel close "+future.channel());  
                        // Channel 关闭后不再引用该Channel  
                        /*channelMap.remove(future.channel().id());  
                        channelsMap.remove(future.channel().id());
                        channelsMap4Ecg.remove(future.channel().id());
                        channelsMap4Oximetry.remove(future.channel().id());
                        channelsMap4PBP.remove(future.channel().id());
                        channelsMap4RESP.remove(future.channel().id());*/
                        //断开连接后清除相关数据
                        String strChannelId=future.channel().id().toString();
                        if (hashID_MAC.containsKey(strChannelId)) {
                        	String mac=hashID_MAC.get(strChannelId);
                        	hashID_MAC.remove(strChannelId);
                        	ArrayList<Channel> arr=channelsMap4Ecg2.get(mac);
                        	if (arr!=null) {
								for (Channel ch : arr) {
									if (ch.id().toString().equals(strChannelId)) {
										arr.remove(ch);
										return;
									}
								}
							}
                        	arr=channelsMap4Oximetry2.get(mac);
                        	if (arr!=null) {
								for (Channel ch : arr) {
									if (ch.id().toString().equals(strChannelId)) {
										arr.remove(ch);
										return;
									}
								}
							}
                        	arr=channelsMap4PBP2.get(mac);
                        	if (arr!=null) {
								for (Channel ch : arr) {
									if (ch.id().toString().equals(strChannelId)) {
										arr.remove(ch);
										return;
									}
								}
							}
                        	arr=channelsMap4RESP2.get(mac);
                        	if (arr!=null) {
								for (Channel ch : arr) {
									if (ch.id().toString().equals(strChannelId)) {
										arr.remove(ch);
										return;
									}
								}
							}
						}
                    }  
                });  
  
                pl.addLast(HN_HTTP_CODEC, new HttpServerCodec());  
                pl.addLast(HN_HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_CONTENT_LENGTH));  
                pl.addLast(HN_HTTP_CHUNK, new ChunkedWriteHandler());  
                pl.addLast(HN_SERVER, new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this));  
            }  
  
        });  
  
        try {  
            // 绑定端口  
            ChannelFuture future = b.bind(host, port).addListener(new ChannelFutureListener() {  
  
                public void operationComplete(ChannelFuture future) throws Exception {  
                    if (future.isSuccess()) {  
                        logger.info("websocket started.");  
                    }  
                }  
            }).sync();  
              
            future.channel().closeFuture().addListener(new ChannelFutureListener() {  
  
                public void operationComplete(ChannelFuture future) throws Exception {  
                    logger.info("server channel "+future.channel()+" closed.");  
                }  
  
            }).sync();  
        } catch (InterruptedException e) {  
            logger.error(e.toString());  
        } finally {  
            bossGroup.shutdownGracefully();  
            workerGroup.shutdownGracefully();  
        }  
        logger.info("websocket server shutdown");  
    }  
  
    /*  
     * @see cc.lixiaohui.demo.netty4.websocket.IHttpService#handleHttpRequest(io.netty.handler.codec.http.FullHttpRequest) 
     */  
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {  
        if (isWebSocketUpgrade(req)) { // 该请求是不是websocket upgrade请求   
            logger.info("upgrade to websocket protocol");  
              
            String subProtocols = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);  
              
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT, subProtocols, false);  
            WebSocketServerHandshaker handshaker = factory.newHandshaker(req);  
              
            if (handshaker == null) {// 请求头不合法, 导致handshaker没创建成功  
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());  
            } else {  
                // 响应该请求  
                handshaker.handshake(ctx.channel(), req);  
                // 把handshaker 绑定给Channel, 以便后面关闭连接用  
                ctx.channel().attr(ATTR_HANDSHAKER).set(handshaker);// attach handshaker to this channel  
            }  
            return;  
        }  
          
        // TODO 忽略普通http请求  
        logger.info("ignoring normal http request");  
    }  
      
    /* 
     * @see 
     * cc.lixiaohui.demo.netty4.websocket.IWebSocketService#handleFrame(io.netty 
     * .channel.Channel, io.netty.handler.codec.http.websocketx.WebSocketFrame) 
     */  
    public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {  
        try {
			// text frame  
			if (frame instanceof TextWebSocketFrame) {  
			    String text = ((TextWebSocketFrame) frame).text();  
			    TextWebSocketFrame rspFrame = new TextWebSocketFrame(text);  
			    logger.info("recieve TextWebSocketFrame from channel "+ ctx.channel());  
			    // 发给其他所有channel  
			    /*for (Channel ch : channelMap.values()) {  
			        if (ctx.channel().equals(ch)) {   
			            continue;   
			        }  
			        ch.writeAndFlush(rspFrame);  
			        logger.info("write text["+text+"] to channel "+ch);  
			    }  */
			    // 保存该Channel的引用  
			    if (text.startsWith("/connectws/")) {//mac:
			    	String[] strs=text.split("/");
			    	String flag=strs[2];
			    	String mac=strs[3];
			    	hashID_MAC.put(ctx.channel().id().toString(), mac);
			    	switch (flag) {
					case "00"://血氧波形
//					channelsMap4Oximetry.put(mac, ctx.channel()); 
						if (!channelsMap4Oximetry2.containsKey(mac)) {
							channelsMap4Oximetry2.put(mac, new ArrayList<Channel>());
						}
						channelsMap4Oximetry2.get(mac).add(ctx.channel());
						break;
					case "01"://心电数据
//					channelsMap4Ecg.put(mac, ctx.channel()); 
//					channelsMap.put("userid", ctx.channel());//尝试同一个页面中多个心电设备共用一个websocket
						if (!channelsMap4Ecg2.containsKey(mac)) {
							channelsMap4Ecg2.put(mac, new ArrayList<Channel>());
						}
						channelsMap4Ecg2.get(mac).add(ctx.channel());
						break;
					case "02"://连续血压数据
//					channelsMap4PBP.put(mac, ctx.channel()); 
						if (!channelsMap4PBP2.containsKey(mac)) {
							channelsMap4PBP2.put(mac, new ArrayList<Channel>());
						}
						channelsMap4PBP2.get(mac).add(ctx.channel());
						break;
					case "03"://呼吸数据
//					channelsMap4RESP.put(mac, ctx.channel()); 
						if (!channelsMap4RESP2.containsKey(mac)) {
							channelsMap4RESP2.put(mac, new ArrayList<Channel>());
						}
						channelsMap4RESP2.get(mac).add(ctx.channel());
						break;
					default:
						break;
					}
			    	//channelsMap.put(mac, ctx.channel()); 
			    	//ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到数据："+text));
				}else if (text.startsWith("/TargetSampleRate/")) {
					String[] strs=text.split("/");
					String flag=strs[2];
					int targetSampleRate=Integer.parseInt(strs[3]);
			    	switch (flag) {
					case "00"://血氧波形
						OximetryAnalysis.npTargetSampleRate=targetSampleRate;
						break;
					case "01"://心电数据
						EcgAnalysis.npTargetSampleRate=targetSampleRate;
						break;
					case "02"://连续血压数据
						BloodPressureAnalysis.npTargetSampleRate=targetSampleRate;
						break;
					case "03"://呼吸数据
						BreathRateAnalysis.npTargetSampleRate=targetSampleRate; 
						break;
					default:
						break;
					}
				}
			    return;  
			}  
			  
			// ping frame, 回复pong frame即可  
			if (frame instanceof PingWebSocketFrame) {  
			    logger.info("recieve PingWebSocketFrame from channel "+ ctx.channel());  
			    ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));  
			    return;  
			}  
			  
			if (frame instanceof PongWebSocketFrame) {  
			    logger.info("recieve PongWebSocketFrame from channel "+ ctx.channel());  
			    return;  
			}  
			// close frame,   
			if (frame instanceof CloseWebSocketFrame) {  
			    logger.info("recieve CloseWebSocketFrame from channel "+ ctx.channel());  
			    WebSocketServerHandshaker handshaker = ctx.channel().attr(ATTR_HANDSHAKER).get();  
			    if (handshaker == null) {  
			        logger.error("channel "+ctx.channel()+" have no HandShaker");  
			        return;  
			    }  
			    handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());  
			    return;  
			}  
			// 剩下的是binary frame, 忽略  
			logger.info("unhandle binary frame from channel "+ ctx.channel());
		} catch (NumberFormatException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage(),e.fillInStackTrace());
		}  
    }  
      
    //三者与：1.GET? 2.Upgrade头 包含websocket字符串?  3.Connection头 包含 Upgrade字符串?  
    private boolean isWebSocketUpgrade(FullHttpRequest req) {  
        HttpHeaders headers = req.headers();  
        return req.method().equals(HttpMethod.GET)   
                && headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE)  
                && headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION);  
    } 
    /**
     * 向客户端（浏览器）发送数据（心电、血氧等数据）
     * @param mac
     * @param data
     */
    public void sendMessage(Object mac,Object data){
    	if (!channelsMap.containsKey(mac)) {
			return;
		}
    	Channel channel = channelsMap.get(mac);
    	if (null!=channel) {
    		//channel.writeAndFlush(data);
    		channel.writeAndFlush(new TextWebSocketFrame("服务器收到数据："+data));
		}
    	
    	
    }

	@Override
	public void run() {
		// TODO Auto-generated method stub
		
	}
	/**
	 * 发送响应数据
	 * 2018年1月20日13:05:33
	 * 暂时未用到，未测试可用性
	 * @param ctx
	 * @param req
	 * @param res
	 */
	private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
		// 返回应答给客户端
		if (res.getStatus().code() != 200) {
			ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
			res.content().writeBytes(buf);
			buf.release();
			HttpUtil.setContentLength(res, res.content().readableBytes());
		}

		// 如果是非Keep-Alive，关闭连接
		ChannelFuture f = ctx.channel().writeAndFlush(res);
		if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
			f.addListener(ChannelFutureListener.CLOSE);
		}
	}
}
